﻿//-----------------------------------------------------------------------
// <copyright file="FlowPrefixAndTailSpec.cs" company="Akka.NET Project">
//     Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
//     Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
// ReSharper disable InvokeAsExtensionMethod

namespace Akka.Streams.Tests.Dsl
{
    public class FlowPrefixAndTailSpec : AkkaSpec
    {
        public ActorMaterializer Materializer { get; set; }

        public FlowPrefixAndTailSpec(ITestOutputHelper helper) : base(helper)
        {
            var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2,2);
            Materializer = ActorMaterializer.Create(Sys, settings);
        }

        private static readonly TestException TestException = new("test");

        private static
            Sink<(IImmutableList<int>, Source<int, NotUsed>), Task<(IImmutableList<int>, Source<int, NotUsed>)>>
            NewHeadSink => Sink.First<(IImmutableList<int>, Source<int, NotUsed>)>();


        [Fact]
        public async Task PrefixAndTail_must_work_on_empty_input()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var futureSink = NewHeadSink;
                var fut = Source.Empty<int>().PrefixAndTail(10).RunWith(futureSink, Materializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                var tailFlow = fut.Result.Item2;
                var tailSubscriber = this.CreateManualSubscriberProbe<int>();
                tailFlow.To(Sink.FromSubscriber(tailSubscriber)).Run(Materializer);
                await tailSubscriber.ExpectSubscriptionAndCompleteAsync();
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_work_on_short_inputs()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var futureSink = NewHeadSink;
                var fut = Source.From(new[] { 1, 2, 3 }).PrefixAndTail(10).RunWith(futureSink, Materializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut.Result.Item1.Should().BeEquivalentTo(new[] { 1, 2, 3 });
                var tailFlow = fut.Result.Item2;
                var tailSubscriber = this.CreateManualSubscriberProbe<int>();
                tailFlow.To(Sink.FromSubscriber(tailSubscriber)).Run(Materializer);
                await tailSubscriber.ExpectSubscriptionAndCompleteAsync();
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_work_on_longer_inputs()
        {
            await this.AssertAllStagesStoppedAsync(() => {
                var futureSink = NewHeadSink;
                var fut = Source.From(Enumerable.Range(1, 10)).PrefixAndTail(5).RunWith(futureSink, Materializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                var takes = fut.Result.Item1;
                var tail = fut.Result.Item2;
                takes.Should().Equal(Enumerable.Range(1, 5));

                var futureSink2 = Sink.First<IEnumerable<int>>();
                var fut2 = tail.Grouped(6).RunWith(futureSink2, Materializer);
                fut2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut2.Result.Should().BeEquivalentTo(Enumerable.Range(6, 5));
                return Task.CompletedTask;
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_handle_zero_take_count()
        {
            await this.AssertAllStagesStoppedAsync(() => {
                var futureSink = NewHeadSink;
                var fut = Source.From(Enumerable.Range(1, 10)).PrefixAndTail(0).RunWith(futureSink, Materializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut.Result.Item1.Should().BeEmpty();
                var tail = fut.Result.Item2;

                var futureSink2 = Sink.First<IEnumerable<int>>();
                var fut2 = tail.Grouped(11).RunWith(futureSink2, Materializer);
                fut2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut2.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
                return Task.CompletedTask;
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_handle_negative_take_count()
        {
            await this.AssertAllStagesStoppedAsync(() => {
                var futureSink = NewHeadSink;
                var fut = Source.From(Enumerable.Range(1, 10)).PrefixAndTail(-1).RunWith(futureSink, Materializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut.Result.Item1.Should().BeEmpty();
                var tail = fut.Result.Item2;

                var futureSink2 = Sink.First<IEnumerable<int>>();
                var fut2 = tail.Grouped(11).RunWith(futureSink2, Materializer);
                fut2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut2.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
                return Task.CompletedTask;
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_work_if_size_of_tak_is_equal_to_stream_size()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var futureSink = NewHeadSink;
                var fut = Source.From(Enumerable.Range(1, 10)).PrefixAndTail(10).RunWith(futureSink, Materializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut.Result.Item1.Should().BeEquivalentTo(Enumerable.Range(1, 10));
                var tail = fut.Result.Item2;
                var subscriber = this.CreateManualSubscriberProbe<int>();
                tail.To(Sink.FromSubscriber(subscriber)).Run(Materializer);
                await subscriber.ExpectSubscriptionAndCompleteAsync();
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_throw_if_tail_is_attempted_to_be_materialized_twice()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var futureSink = NewHeadSink;
                var fut = Source.From(Enumerable.Range(1, 2)).PrefixAndTail(1).RunWith(futureSink, Materializer);
                var (list, tail) = await fut.WaitAsync(3.Seconds());
                list.Should().BeEquivalentTo(Enumerable.Range(1, 1));

                var subscriber1 = this.CreateSubscriberProbe<int>();
                tail.To(Sink.FromSubscriber(subscriber1)).Run(Materializer);
                await subscriber1.EnsureSubscriptionAsync();

                var subscriber2 = this.CreateSubscriberProbe<int>();
                tail.To(Sink.FromSubscriber(subscriber2)).Run(Materializer);
                await subscriber2.EnsureSubscriptionAsync();

                // One of the subscriber must fail, which one, we don't know yet
                TestSubscriber.Probe<int> success;
                using (var cts = new CancellationTokenSource())
                {
                    var t1 = subscriber1.ExpectErrorAsync(cts.Token);
                    var t2 = subscriber2.ExpectErrorAsync(cts.Token);
                    var failed = await Task.WhenAny(t1, t2);
                    await cts.CancelAsync();
                    
                    (await failed).Message.Should().Be("Substream Source cannot be materialized more than once");
                    
                    success = failed == t1 ? subscriber2 : subscriber1;
                }

                await success.RequestNext(2).ExpectCompleteAsync();
            }, Materializer);
        }
        
        [Fact]
        public async Task PrefixAndTail_must_signal_error_if_substream_has_been_not_subscribed_in_time()
        {
            await this.AssertAllStagesStoppedAsync(() => {
                var ms = 300;

                var settings = ActorMaterializerSettings.Create(Sys)
                    .WithSubscriptionTimeoutSettings(
                        new StreamSubscriptionTimeoutSettings(
                            StreamSubscriptionTimeoutTerminationMode.CancelTermination,
                            TimeSpan.FromMilliseconds(ms)));
                var tightTimeoutMaterializer = ActorMaterializer.Create(Sys, settings);

                var futureSink = NewHeadSink;
                var fut = Source.From(Enumerable.Range(1, 2)).PrefixAndTail(1).RunWith(futureSink, tightTimeoutMaterializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut.Result.Item1.Should().BeEquivalentTo(Enumerable.Range(1, 1));
                var tail = fut.Result.Item2;

                var subscriber = this.CreateSubscriberProbe<int>();
                Thread.Sleep(1000);
                tail.To(Sink.FromSubscriber(subscriber)).Run(tightTimeoutMaterializer);
                subscriber.ExpectSubscriptionAndError()
                    .Message.Should()
                    .Be("Substream Source has not been materialized in 00:00:00.3000000");
                return Task.CompletedTask;
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_not_fail_the_stream_if_substream_has_not_been_subscribed_in_time_and_configured_subscription_timeout_is_noop()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var settings = ActorMaterializerSettings.Create(Sys)                                                                             
                .WithSubscriptionTimeoutSettings(                                                                                 
                    new StreamSubscriptionTimeoutSettings(                                                                                     
                        StreamSubscriptionTimeoutTerminationMode.NoopTermination,                                                                                     
                        TimeSpan.FromMilliseconds(1)));
                var tightTimeoutMaterializer = ActorMaterializer.Create(Sys, settings);

                var futureSink = NewHeadSink;
                var fut = Source.From(Enumerable.Range(1, 2)).PrefixAndTail(1).RunWith(futureSink, tightTimeoutMaterializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut.Result.Item1.Should().BeEquivalentTo(Enumerable.Range(1, 1));

                var subscriber = this.CreateSubscriberProbe<int>();
                Thread.Sleep(200);
                fut.Result.Item2.To(Sink.FromSubscriber(subscriber)).Run(tightTimeoutMaterializer);
                subscriber.ExpectSubscription().Request(2);
                await subscriber.ExpectNext(2).ExpectCompleteAsync();
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_shut_down_main_stage_if_substream_is_empty_even_when_not_subscribed()
        {
            await this.AssertAllStagesStoppedAsync(() => {
                var futureSink = NewHeadSink;
                var fut = Source.Single(1).PrefixAndTail(1).RunWith(futureSink, Materializer);
                fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
                fut.Result.Item1.Should().ContainSingle(i => i == 1);
                return Task.CompletedTask;
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_handle_OnError_when_no_substream_is_open()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var publisher = this.CreateManualPublisherProbe<int>();
                var subscriber = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();

                Source.FromPublisher(publisher)
                    .PrefixAndTail(3)
                    .To(Sink.FromSubscriber(subscriber))
                    .Run(Materializer);

                var upstream = await publisher.ExpectSubscriptionAsync();
                var downstream = await subscriber.ExpectSubscriptionAsync();

                downstream.Request(1);

                await upstream.ExpectRequestAsync();
                upstream.SendNext(1);
                upstream.SendError(TestException);

                subscriber.ExpectError().Should().Be(TestException);
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_handle_OnError_when_substream_is_open()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var publisher = this.CreateManualPublisherProbe<int>();
                var subscriber = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();

                Source.FromPublisher(publisher)
                    .PrefixAndTail(1)
                    .To(Sink.FromSubscriber(subscriber))
                    .Run(Materializer);

                var upstream = await publisher.ExpectSubscriptionAsync();
                var downstream = await subscriber.ExpectSubscriptionAsync();

                downstream.Request(1000);

                await upstream.ExpectRequestAsync();
                upstream.SendNext(1);

                var t = await subscriber.ExpectNextAsync();
                t.Item1.Should().ContainSingle(i => i == 1);
                var tail = t.Item2;
                await subscriber.ExpectCompleteAsync();

                var substreamSubscriber = this.CreateManualSubscriberProbe<int>();
                tail.To(Sink.FromSubscriber(substreamSubscriber)).Run(Materializer);
                await substreamSubscriber.ExpectSubscriptionAsync();
                upstream.SendError(TestException);
                substreamSubscriber.ExpectError().Should().Be(TestException);
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_handle_master_stream_cancellation()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var publisher = this.CreateManualPublisherProbe<int>();
                var subscriber = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();

                Source.FromPublisher(publisher)
                    .PrefixAndTail(3)
                    .To(Sink.FromSubscriber(subscriber))
                    .Run(Materializer);

                var upstream = await publisher.ExpectSubscriptionAsync();
                var downstream = await subscriber.ExpectSubscriptionAsync();

                downstream.Request(1);

                await upstream.ExpectRequestAsync();
                upstream.SendNext(1);

                downstream.Cancel();
                await upstream.ExpectCancellationAsync();
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_handle_substream_cancellation()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var publisher = this.CreateManualPublisherProbe<int>();
                var subscriber = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();

                Source.FromPublisher(publisher)
                    .PrefixAndTail(1)
                    .To(Sink.FromSubscriber(subscriber))
                    .Run(Materializer);

                var upstream = await publisher.ExpectSubscriptionAsync();
                var downstream = await subscriber.ExpectSubscriptionAsync();

                downstream.Request(1000);

                await upstream.ExpectRequestAsync();
                upstream.SendNext(1);

                var t = await subscriber.ExpectNextAsync();
                t.Item1.Should().ContainSingle(i => i == 1);
                var tail = t.Item2;
                await subscriber.ExpectCompleteAsync();

                var substreamSubscriber = this.CreateManualSubscriberProbe<int>();
                tail.To(Sink.FromSubscriber(substreamSubscriber)).Run(Materializer);
                (await substreamSubscriber.ExpectSubscriptionAsync()).Cancel();

                await upstream.ExpectCancellationAsync();
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_pass_along_early_cancellation()
        {
            await this.AssertAllStagesStoppedAsync(async() => {
                var up = this.CreateManualPublisherProbe<int>();
                var down = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();

                var flowSubscriber = Source.AsSubscriber<int>()
                    .PrefixAndTail(1)
                    .To(Sink.FromSubscriber(down))
                    .Run(Materializer);

                var downstream = await down.ExpectSubscriptionAsync();
                downstream.Cancel();
                up.Subscribe(flowSubscriber);
                var upSub = await up.ExpectSubscriptionAsync();
                await upSub.ExpectCancellationAsync();
            }, Materializer);
        }

        [Fact]
        public async Task PrefixAndTail_must_work_even_if_tail_subscriber_arrives_after_substream_completion()
        {
            var pub = this.CreateManualPublisherProbe<int>();
            var sub = this.CreateManualSubscriberProbe<int>();

            var f =
                Source.FromPublisher(pub)
                    .PrefixAndTail(1)
                    .RunWith(Sink.First<(IImmutableList<int>, Source<int, NotUsed>)>(), Materializer);
            var s = await pub.ExpectSubscriptionAsync();
            s.SendNext(0);

            f.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
            var tail = f.Result.Item2;
            var tailPub = tail.RunWith(Sink.AsPublisher<int>(false), Materializer);
            s.SendComplete();

            tailPub.Subscribe(sub);
            await sub.ExpectSubscriptionAndCompleteAsync();
        }
    }
}
